Skip to content

Conversation

@jlabedo
Copy link
Contributor

@jlabedo jlabedo commented Dec 23, 2025

Why is this change proposed?

Handle projecting from multiple prooph's stream

Description of Changes

  • Implemented EventStoreMultiStreamSource as a composition over EventStoreGlobalStreamSource instances.
  • Per stream, a small page is loaded and events are interleaved globally by:
    • created_at column
    • configured stream order (tie‑breaker)
    • no ascending (final tie‑breaker)
  • Combined projection position is encoded as stream=position:g1,g2;....

The requested batch size is handled on a best-effort basis: it may be smaller or larger depending on how events are distributed across streams.

Pull Request Contribution Terms

  • I have read and agree to the contribution terms outlined in CONTRIBUTING.

@jlabedo jlabedo requested a review from dgafka December 24, 2025 09:42

$events = array_map(fn(array $tuple) => $tuple[1], $all);

return new StreamPage($events, $this->encodePositions($newPositions));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For single stream we should also encode positions same way, so if someone would add one more FromStream to existing projection, it would still works.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, I will do that in another PR, so I can add some migration code with the existing encoding

if ($partitionHeaderName !== null) {
if (count($streamAttributes) > 1) {
throw ConfigurationException::create("Projection {$projectionName} cannot be partitioned by aggregate id when multiple streams are configured");
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there is no ordering guarnateed between different event logs, then what is the blocker for partition based?
We would still guarnatee order within given partition which is inside specific stream

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no blocker, we can add it in a following pr

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The blocker is the partition provider: in case of multiple streams, we have to find a way to merge partitions from all streams. I may do it in a following PR.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please wait with doing the changes to Partition based, as I am doing the work to enable asynchronous batched rebuild, which will require changes to Partition Provider

$extensions[] = new EventStoreMultiStreamSourceBuilder(
$map,
[$projectionName],
);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to simplify we could have EventStoreMultiStreamSourceBuilder::create([$streamTrackerBuilders]), and if there is only single one passed, we simply return it (no need for if else then)

dgafka
dgafka previously approved these changes Dec 26, 2025
# Conflicts:
#	packages/PdoEventSourcing/src/Config/ProophProjectingModule.php
#	packages/PdoEventSourcing/src/Projecting/StreamSource/EventStoreGlobalStreamSource.php
@jlabedo jlabedo requested a review from dgafka January 5, 2026 17:16
* @return ($attributeClassName is null ? list<object> : list<T>)
*/
public function getAnnotationsForClass(string $className): array;
public function getAnnotationsForClass(string $className, ?string $attributeClassName = null): array;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added this parameter to allow finding repeated attributes on the same class.

foreach ($this->sources as $stream => $source) {
$orderIndex[$stream] = $i++;

$limit = (int)ceil($count / max(1, count($this->sources))) + 5;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where does this magic 5, comes from? :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From AI :)
This is the minimum events to fetch per stream, I will add a constant

if ($a->timestamp === $b->timestamp) {
return $orderIndex[$aStream] <=> $orderIndex[$bStream];
}
return $a->timestamp <=> $b->timestamp;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we should even bother with ordering that, if that's not really true order - it will all depends on the positions in the stream. E.g. in one load batch, we may sort event from 2024 and 2026.

For non-rebuild scenarios, it may create feeling for end users that it's indeed somehow sorted, but it's really not and rebuild of the projection will point all assumptions like this out.
In Kafka joining different streams, there is no order guarantee, not even sorting like this being done. You want to join streams, then design the system in a way, it does not depend on the order between those (basically treat them like a separate partitions)

But not a blocker, if you feel ordering make sense

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is for prooph compatibility.

@dgafka dgafka merged commit 42ad32d into ecotoneframework:main Jan 6, 2026
7 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants